- 
                Notifications
    You must be signed in to change notification settings 
- Fork 80
feat(taps): Queue parent contexts and sync child streams only when the queue is full #3058
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| Reviewer's GuideImplements batched child-stream synchronizations by queuing parent contexts in a list and flushing them in bulk when a maximum queue size is reached or after the parent stream finishes syncing. Sequence diagram for batched child stream synchronizationsequenceDiagram
    participant ParentStream
    participant ChildContextQueue
    participant ChildStream
    loop For each parent context
        ParentStream->>ChildContextQueue: Add context to queue
        alt Queue size >= QUEUE_MAX_SIZE
            ParentStream->>ChildContextQueue: Flush queue
            ChildContextQueue->>ChildStream: Sync all queued contexts
            ChildContextQueue->>ChildContextQueue: Clear queue
        end
    end
    ParentStream->>ChildContextQueue: Flush any remaining contexts (after parent sync)
    ChildContextQueue->>ChildStream: Sync all remaining contexts
    ChildContextQueue->>ChildContextQueue: Clear queue
Class diagram for updated Stream batching logicclassDiagram
    class Stream {
        +int QUEUE_MAX_SIZE
        +list _child_context_queue
        +_sync_children(child_context)
        +_flush_child_context_queue()
    }
    Stream --> "*" Stream : child_streams
File-Level Changes
 Tips and commandsInteracting with Sourcery
 Customizing Your ExperienceAccess your dashboard to: 
 Getting Help
 | 
c7d68fb    to
    01aff07      
    Compare
  
    | Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@            Coverage Diff             @@
##             main    #3058      +/-   ##
==========================================
+ Coverage   93.59%   93.61%   +0.01%     
==========================================
  Files          69       69              
  Lines        5665     5681      +16     
  Branches      700      703       +3     
==========================================
+ Hits         5302     5318      +16     
  Misses        258      258              
  Partials      105      105              
 Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
 | 
| CodSpeed Performance ReportMerging #3058 will not alter performanceComparing  Summary
 | 
| @sourcery-ai review | 
| @sourcery-ai review | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @edgarrmondragon - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `singer_sdk/streams/core.py:1368` </location>
<code_context>
+            self.name,
+        )
+
+        for context in self._child_context_queue:
+            for child_stream in self.child_streams:
+                if child_stream.selected or child_stream.has_selected_descendents:
+                    child_stream.sync(context=context)
+
</code_context>
<issue_to_address>
Consider exception handling during child stream sync.
An exception in child_stream.sync may halt queue processing. Wrap sync in try/except and log errors to enhance robustness.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
        for context in self._child_context_queue:
            for child_stream in self.child_streams:
                if child_stream.selected or child_stream.has_selected_descendents:
                    child_stream.sync(context=context)
=======
        for context in self._child_context_queue:
            for child_stream in self.child_streams:
                if child_stream.selected or child_stream.has_selected_descendents:
                    try:
                        child_stream.sync(context=context)
                    except Exception as exc:
                        self.logger.error(
                            "Error syncing child stream '%s' with context '%s': %s",
                            getattr(child_stream, "name", repr(child_stream)),
                            context,
                            exc,
                            exc_info=True,
                        )
>>>>>>> REPLACE
</suggested_fix>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| Documentation build overview
 Show files changed (2 files in total): 📝 2 modified | ➕ 0 added | ➖ 0 deleted
 | 
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Summary by Sourcery
Introduce batching of child stream synchronization by queueing parent contexts and flushing them when the queue reaches a configurable maximum or at the end of a parent sync.
New Features:
Enhancements:
Tests: